1   /*
2    * Copyright (C) 2014 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import static com.google.common.base.Preconditions.checkNotNull;
20  
21  import com.google.common.base.Preconditions;
22  import com.google.common.collect.Queues;
23  
24  import java.util.Queue;
25  import java.util.concurrent.Executor;
26  import java.util.logging.Level;
27  import java.util.logging.Logger;
28  
29  import javax.annotation.concurrent.GuardedBy;
30  
31  /**
32   * A special purpose queue/executor that executes listener callbacks serially on a configured
33   * executor.  Each callback task can be enqueued and executed as separate phases.
34   * 
35   * <p>This class is very similar to {@link SerializingExecutor} with the exception that tasks can
36   * be enqueued without necessarily executing immediately.
37   */
38  final class ListenerCallQueue<L> implements Runnable {
39    // TODO(cpovirk): consider using the logger associated with listener.getClass().
40    private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName());
41  
42    abstract static class Callback<L> {
43      private final String methodCall;
44  
45      Callback(String methodCall) {
46        this.methodCall = methodCall;
47      }
48  
49      abstract void call(L listener);
50      
51      /** Helper method to add this callback to all the queues. */
52      void enqueueOn(Iterable<ListenerCallQueue<L>> queues) {
53        for (ListenerCallQueue<L> queue : queues) {
54          queue.add(this);
55        }
56      }
57    }
58  
59    private final L listener;
60    private final Executor executor;
61  
62    @GuardedBy("this") private final Queue<Callback<L>> waitQueue = Queues.newArrayDeque();
63    @GuardedBy("this") private boolean isThreadScheduled;
64  
65    ListenerCallQueue(L listener, Executor executor) {
66      this.listener = checkNotNull(listener);
67      this.executor = checkNotNull(executor);
68    }
69  
70    /** Enqueues a task to be run. */
71    synchronized void add(Callback<L> callback) {
72      waitQueue.add(callback);
73    }
74  
75    /** Executes all listeners {@linkplain #add added} prior to this call, serially and in order.*/
76    void execute() {
77      boolean scheduleTaskRunner = false;
78      synchronized (this) {
79        if (!isThreadScheduled) {
80          isThreadScheduled = true;
81          scheduleTaskRunner = true;
82        }
83      }
84      if (scheduleTaskRunner) {
85        try {
86          executor.execute(this);
87        } catch (RuntimeException e) {
88          // reset state in case of an error so that later calls to execute will actually do something
89          synchronized (this) {
90            isThreadScheduled = false;
91          }
92          // Log it and keep going.
93          logger.log(Level.SEVERE,
94              "Exception while running callbacks for " + listener + " on " + executor, 
95              e);
96          throw e;
97        }
98      }
99    }
100 
101   @Override public void run() {
102     boolean stillRunning = true;
103     try {
104       while (true) {
105         Callback<L> nextToRun;
106         synchronized (ListenerCallQueue.this) {
107           Preconditions.checkState(isThreadScheduled);
108           nextToRun = waitQueue.poll();
109           if (nextToRun == null) {
110             isThreadScheduled = false;
111             stillRunning = false;
112             break;
113           }
114         }
115 
116         // Always run while _not_ holding the lock, to avoid deadlocks.
117         try {
118           nextToRun.call(listener);
119         } catch (RuntimeException e) {
120           // Log it and keep going.
121           logger.log(Level.SEVERE, 
122               "Exception while executing callback: " + listener + "." + nextToRun.methodCall, 
123               e);
124         }
125       }
126     } finally {
127       if (stillRunning) {
128         // An Error is bubbling up, we should mark ourselves as no longer
129         // running, that way if anyone tries to keep using us we won't be
130         // corrupted.
131         synchronized (ListenerCallQueue.this) {
132           isThreadScheduled = false;
133         }
134       }
135     }
136   }
137 }